package com.github.iogogogo.kafka.manager.impl;

import com.github.iogogogo.common.util.JsonParse;
import com.github.iogogogo.kafka.conf.KafkaConf;
import com.github.iogogogo.kafka.manager.KafkaService;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.vavr.Tuple;
import io.vavr.Tuple3;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
 * Created by tao.zeng on 2021/3/1.
 */
@Slf4j
@Component
public class KafkaServiceImpl implements KafkaService {

    @Resource
    private AdminClient adminClient;

    @Resource
    private KafkaProperties kafkaProperties;

    @Resource
    private KafkaConf kafkaConf;

    @Override
    public Set<String> listTopic() {
        try {
            ListTopicsResult listTopicsResult = adminClient.listTopics();
            KafkaFuture<Set<String>> kafkaFuture = listTopicsResult.names();
            return kafkaFuture.get(kafkaConf.getTimeout(), kafkaConf.getUnit()).stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("query topic list failure", e);
        }
        return Sets.newHashSet();
    }

    @Override
    public Map<String, TopicDescription> describeTopics(Collection<String> topicNames) {
        return describeTopics(topicNames, null);
    }

    @Override
    public Map<String, ConsumerGroupDescription> describeConsumerGroups(Collection<String> groupIds) {
        try {
            DescribeConsumerGroupsResult result = adminClient.describeConsumerGroups(groupIds);
            return result.all().get(kafkaConf.getTimeout(), kafkaConf.getUnit());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("describe consumer groups failure", e);
        }
        return Maps.newHashMap();
    }

    @Override
    public Map<String, TopicDescription> describeTopics(Collection<String> topicNames, DescribeTopicsOptions options) {
        try {
            DescribeTopicsResult describeTopics = Objects.nonNull(options) ? adminClient.describeTopics(topicNames, options) : adminClient.describeTopics(topicNames);
            KafkaFuture<Map<String, TopicDescription>> future = describeTopics.all();
            return future.get(kafkaConf.getTimeout(), kafkaConf.getUnit());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("describe topics failure", e);
        }
        return Maps.newHashMap();
    }

    @Override
    public boolean existsTopic(String topic) {
        return listTopic().contains(topic);
    }

    @Override
    public List<String> topicGroups(String topic) {
        List<ConsumerGroupListing> consumerGroups = listConsumerGroups();
        List<String> collect = consumerGroups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList());
        Map<String, ConsumerGroupDescription> descriptionMap = describeConsumerGroups(collect);
        List<String> topicGroups = Lists.newArrayList();
        descriptionMap.forEach((k, v) -> {
            boolean topicSubscribed = v.members().stream()
                    .map(MemberDescription::assignment)
                    .map(MemberAssignment::topicPartitions)
                    .map(x -> x.stream().map(TopicPartition::topic).collect(Collectors.toSet()))
                    .anyMatch(x -> x.contains(topic));

            if (topicSubscribed) {
                topicGroups.add(k);
            }
        });
        return topicGroups.stream().sorted().collect(Collectors.toList());
    }

    @Override
    public Long topicEndOffset(String topic, String groupId, String reset, boolean autoCommit) {
        Map<TopicPartition, Long> topicPartitionEndOffset = topicPartitionEndOffset(topic, groupId, reset, autoCommit);
        return topicPartitionEndOffset.values().stream().mapToLong(x -> x).sum();
    }

    @Override
    public Map<TopicPartition, Long> topicPartitionEndOffset(String topic, String groupId, String reset, boolean autoCommit) {
        Consumer<String, String> consumer = buildConsumer(groupId, reset, autoCommit);
        List<PartitionInfo> partitions = consumer.partitionsFor(topic);
        List<TopicPartition> collect = partitions.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toList());
        Map<TopicPartition, Long> offsets = consumer.endOffsets(collect);
        if (!autoCommit) consumer.commitAsync();
        return offsets;
    }

    @Override
    public Long topicCurrentOffset(String topic, String groupId, String reset, boolean autoCommit) {
        Map<TopicPartition, Long> map = topicPartitionCurrentOffset(topic, groupId, reset, autoCommit);
        return map.values().stream().mapToLong(x -> x).sum();
    }

    @Override
    public Map<TopicPartition, Long> topicPartitionCurrentOffset(String topic, String groupId, String reset, boolean autoCommit) {
        Consumer<String, String> consumer = buildConsumer(groupId, reset, autoCommit);
        List<PartitionInfo> partitions = consumer.partitionsFor(topic);

        Set<TopicPartition> collect = partitions.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());

        Map<TopicPartition, OffsetAndMetadata> metadataMap = consumer.committed(collect);
        Map<TopicPartition, Long> map = Maps.newHashMap();
        metadataMap.forEach((k, v) -> map.put(k, v.offset()));

        if (!autoCommit) consumer.commitAsync(metadataMap, (offsets, e) -> log.info("offset commit completed.", e));

        return map;
    }

    @Override
    public Tuple3<Long, Long, Long> topicLag(String topic, String groupId, String reset, boolean autoCommit) {
        Long partitionOffset = topicEndOffset(topic, groupId, reset, autoCommit);
        Long currentOffset = topicCurrentOffset(topic, groupId, reset, autoCommit);
        return Tuple.of(currentOffset, partitionOffset, partitionOffset - currentOffset);
    }

    @Override
    public List<Object> toLatestConsuming(String topic, String groupId, String reset, boolean autoCommit) {
        return toLatestConsuming(topic, groupId, reset, -1, autoCommit);
    }

    @Override
    public List<Object> toLatestConsuming(String topic, String groupId, String reset, int timeoutSize, boolean autoCommit) {
        Map<TopicPartition, Long> currentOffset = topicPartitionCurrentOffset(topic, groupId, reset, autoCommit);

        String tmpGroupId = uuid();
        log.info("tmpGroupId:{}", tmpGroupId);
        Consumer<String, String> consumer = buildConsumer(tmpGroupId, OFFSET_RESET_EARLIEST, Boolean.FALSE);

        List<Object> dataList = Lists.newArrayList();
        currentOffset.forEach((k, v) -> {
            TopicPartition partition = new TopicPartition(k.topic(), k.partition());
            consumer.assign(Lists.newArrayList(partition));
            consumer.seek(partition, v - 1);

            int i = 0;
            Object data = null;
            loop:
            for (; ; ) {
                if (i == timeoutSize) break;
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
                for (ConsumerRecord<String, String> record : records) {
                    String value = record.value();
                    if (StringUtils.startsWithIgnoreCase(value, "[")) {
                        data = JsonParse.parseList(value);
                    } else if (StringUtils.startsWithIgnoreCase(value, "{")) {
                        data = JsonParse.parseMap(value);
                    } else {
                        data = value;
                    }
                    dataList.add(data);
                    break loop;
                }
                i++;
            }
            if (!autoCommit)
                consumer.commitAsync();

            log.debug("partition:{} offset:{} data:{}", k.partition(), v, data);
        });

        deleteConsumerGroups(Lists.newArrayList(tmpGroupId));
        return dataList;
    }


    @Override
    public boolean createTopic(String topic, int numPartitions, short replicationFactor) {
        try {
            List<NewTopic> newTopics = Lists.newArrayList(new NewTopic(topic, numPartitions, replicationFactor));
            CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics);
            KafkaFuture<Void> future = createTopicsResult.all();
            future.get(kafkaConf.getTimeout(), kafkaConf.getUnit());
            return true;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("topic [ {} ] create failure", topic, e);
        }
        return false;
    }

    @Override
    public boolean createTopic(Collection<NewTopic> newTopics) {
        try {
            CreateTopicsResult topicsResult = adminClient.createTopics(newTopics);
            KafkaFuture<Void> future = topicsResult.all();
            future.get(kafkaConf.getTimeout(), kafkaConf.getUnit());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("topic [ {} ] create failure", newTopics.stream().map(NewTopic::name).collect(Collectors.joining(",")), e);
        }
        return false;
    }

    @Override
    public boolean deleteTopics(Collection<String> topics) {
        try {
            DeleteTopicsResult result = adminClient.deleteTopics(topics);
            KafkaFuture<Void> future = result.all();
            future.get(kafkaConf.getTimeout(), kafkaConf.getUnit());
            return true;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("delete topics [ {} ] failure", topics, e);
        }
        return false;
    }

    @Override
    public Map<String, Object> describeCluster() {
        Map<String, Object> map = Maps.newHashMap();
        try {
            DescribeClusterResult result = adminClient.describeCluster();
            String clusterId = result.clusterId().get(kafkaConf.getTimeout(), kafkaConf.getUnit());
            Collection<Node> nodes = result.nodes().get(kafkaConf.getTimeout(), kafkaConf.getUnit());
            Node controller = result.controller().get(kafkaConf.getTimeout(), kafkaConf.getUnit());
            map.put("clusterId", clusterId);
            map.put("nodes", nodes);
            map.put("controller", controller);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("describe cluster failure", e);
        }
        return map;
    }

    @Override
    public Map<ConfigResource, Config> describeConfigs(Collection<ConfigResource> resources) {
        try {
            DescribeConfigsResult result = adminClient.describeConfigs(resources);
            return result.all().get(kafkaConf.getTimeout(), kafkaConf.getUnit());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("describe configs failure", e);
        }
        return Maps.newHashMap();
    }

    @Override
    public boolean alterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
        try {
            AlterConfigsResult result = adminClient.incrementalAlterConfigs(configs);
            result.all().get(kafkaConf.getTimeout(), kafkaConf.getUnit());
            return true;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("alter configs [ {} ] failure", configs, e);
        }
        return false;
    }

    @Override
    public List<ConsumerGroupListing> listConsumerGroups() {
        try {
            ListConsumerGroupsResult result = adminClient.listConsumerGroups();
            KafkaFuture<Collection<ConsumerGroupListing>> future = result.all();
            return Lists.newArrayList(future.get(kafkaConf.getTimeout(), kafkaConf.getUnit()));
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("list consumer groups failure", e);
        }
        return Lists.newArrayList();
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
        try {
            ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets(groupId, options);
            KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future = result.partitionsToOffsetAndMetadata();
            return future.get(kafkaConf.getTimeout(), kafkaConf.getUnit());
        } catch (InterruptedException | TimeoutException | ExecutionException e) {
            log.error("list consumer groups [ {} ] offset failure", groupId, e);
        }
        return Maps.newHashMap();
    }

    @Override
    public boolean deleteConsumerGroups(Collection<String> groupIds) {
        try {
            log.info("delete group:{}", groupIds);
            DeleteConsumerGroupsResult result = adminClient.deleteConsumerGroups(groupIds);
            result.all().get(kafkaConf.getTimeout(), kafkaConf.getUnit());
            return true;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("delete consumer groups [ {} ] failure", groupIds, e);
        }
        return false;
    }

    @Override
    public Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
        try {
            ListOffsetsResult result = adminClient.listOffsets(topicPartitionOffsets);
            KafkaFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> future = result.all();
            return future.get(kafkaConf.getTimeout(), kafkaConf.getUnit());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("list offsets [ {} ] query failure", topicPartitionOffsets, e);
        }
        return Maps.newHashMap();
    }

    private Consumer<String, String> buildConsumer(String groupId, String reset, boolean autoCommit) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(autoCommit));
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaConf.getMaxFetchSize());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, reset);
        return new KafkaConsumer<>(props);
    }
}
